WebSocketClient.resetPing   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 12
rs 9.8
c 0
b 0
f 0
cc 4
1
import './types'
2
import { Zipnum } from 'zipnum'
3
import { add_event, rm_event, sett } from './utils'
4
import { processConfig } from './config'
5
import { AnyFunc, both, callWith, F, isNil, notf, once, qfilter, T, typeIs } from 'pepka'
6
7
const MAX_32 = 2**31 - 1
8
const zipnum = new Zipnum()
9
const callit = callWith([])
10
const isNumber = both(typeIs('Number'), notf(isNaN))
11
const ping_send_opts: wsc.SendOptions = {_is_ping: true}
12
13
type EventHandler<T extends keyof WebSocketEventMap> = AnyFunc<any, [WebSocketEventMap[T]]>
14
type EventHandlers = {
15
  open: EventHandler<'open'>[]
16
  close: EventHandler<'close'>[]
17
  error: EventHandler<'error'>[]
18
  message: AnyFunc<any, [WebSocketEventMap['message'] & {data: any}]>[]
19
  timeout: AnyFunc<any, [data: any]>[]
20
}
21
22
class WebSocketClient {
23
  private ws: wsc.Socket|null = null
24
  private intentionally_closed = false
25
  private reconnect_timeout: NodeJS.Timeout|null = null
26
  private queue: Record<string, wsc.Message> = {}
27
  private onReadyQueue: AnyFunc[] = []
28
  private onCloseQueue: AnyFunc[] = []
29
  private handlers: EventHandlers = { open: [], close: [], message: [], error: [], timeout: [] }
30
  private config = <wsc.Config>{}
31
  private ping_timer: NodeJS.Timeout|null = null
32
  private idle_timer: NodeJS.Timeout|null = null
33
  private get opened() { return this.ws?.readyState===1 }  // The only opened state.
34
35
  private init_flush(): void {
36
    // TODO: reject them or save somehow ?..
37
    qfilter(F, this.queue)
38
  }
39
  private call(event_name: wsc.WSEvent, ...args: any[]) {
40
    for(const h of this.handlers[event_name]) h(...args)
41
  }
42
43
  private log(event: string, message: any = null, time: number|null = null): void {
44
    const {config} = this
45
    if(time === null)
46
      if(config.timer) config.log(event, null, message)
47
      else config.log(event, message)
48
    else
49
      config.log(event, time, message)
50
  }
51
52
  private resetPing() {
53
    const {config: {ping}, ping_timer} = this
54
    if(ping) {
55
      if(!isNil(ping_timer))
56
        clearTimeout(ping_timer as NodeJS.Timeout)
57
      this.ping_timer = sett(ping.interval*1e3, async () => {
58
        const {ping_timer, opened} = this
59
        if(opened) {
60
          await this.send(ping.content, ping_send_opts)
61
          this.resetPing()
62
        } else clearTimeout(ping_timer!)
63
      })
64
    }
65
  }
66
67
  private resetIdle() {
68
    const {config: {max_idle_time: time}, idle_timer} = this
69
    if(time!==Infinity) {
70
      if(!isNil(idle_timer)) clearTimeout(idle_timer!)
71
      this.idle_timer = sett(time*1e3, () => this.opened && this.close())
72
    }
73
  }
74
75
  private initSocket(ws: wsc.Socket) {
76
    const {queue, config} = this
77
    this.ws = ws
78
    this.onReadyQueue.forEach((fn: Function) => fn())
79
    this.onReadyQueue.splice(0)
80
    const {id_key, data_key} = config.server
81
    // Works also on previously opened sockets that do not fire 'open' event.
82
    this.call('open', ws)
83
    for(const msg_id in queue) ws.send(queue[msg_id].msg)
84
    if(this.reconnect_timeout !== null) {
85
      clearInterval(this.reconnect_timeout)
86
      this.reconnect_timeout = null
87
    }
88
    this.resetPing(); this.resetIdle()
89
    add_event(ws, 'close', async (...e) => {
90
      this.log('close')
91
      this.ws = null
92
      this.onCloseQueue.forEach(callit)
93
      this.onCloseQueue.splice(0)
94
      this.call('close', ...e)
95
      // Auto reconnect.
96
      let {reconnect, reconnection_attempts} = config
97
      if(isNumber(reconnect)) {
98
        const reconnectFunc = async () => {
99
          if(this.intentionally_closed || !reconnection_attempts) return;
100
          reconnection_attempts--
101
          this.log('reconnect')
102
          if(!isNil(this.ws)) {
103
            this.ws!.close()
104
            this.ws = null
105
          }
106
          // If some error occured, try again.
107
          const status = await this.connect()
108
          if(!isNil(status))
109
            this.reconnect_timeout = setTimeout(reconnectFunc, reconnect*1e3)
110
        }
111
        // TODO: test normal close by server. Would it be infinite ?
112
        reconnectFunc()
113
      }
114
    })
115
    add_event(ws, 'message', (e) => {
116
      try {
117
        const data = config.decode(e.data)
118
        this.call('message', {...e, data})
119
        if(data[id_key]) {
120
          const q = this.queue[data[id_key]]
121
          if(q) {
122
            // Debug, Log.
123
            const time = q.sent_time ? (Date.now() - q.sent_time) : null
124
            this.log('message', data[data_key], time)
125
            // Play.
126
            q.ff(data[data_key])
127
          }
128
        }
129
      } catch (err) {
130
        console.error(err, `WSP: Decode error. Got: ${e.data}`)
131
      }
132
      this.resetPing()
133
      this.resetIdle()
134
    })
135
  }
136
137
  private opening = false
138
  private connect() { // returns status if won't open or null if ok.
139
    return new Promise<null|number>((ff) => {
140
      if(this.opened||this.opening) return ff(null)
141
      this.opening = true
142
      const config = this.config
143
      const ws = config.socket || config.adapter(config.url, config.protocols)
144
      if(!ws || ws.readyState > 1) {
145
        this.opening = false
146
        this.ws = null
147
        this.log('error', 'ready() on closing or closed state! status 2.')
148
        return ff(2)
149
      }
150
      const ffo = once((s: null|number) => {this.opening=false; ff(s)})
151
      add_event(ws, 'error', once((e) => {
152
        this.ws = null
153
        this.log('error', 'status 3. Err: '+e.message)
154
        this.call('error', e)
155
        // Some network error: Connection refused or so.
156
        ffo(3)
157
      }))
158
      // Because 'open' won't be envoked on opened socket.
159
      if(ws.readyState) {
160
        this.initSocket(ws)
161
        ffo(null)
162
      } else {
163
        add_event(ws, 'open', once(() => {
164
          this.log('open')
165
          this.initSocket(ws)
166
          ffo(null)
167
        }))
168
      }
169
    })
170
  }
171
  public get socket() { return this.ws }
172
  public async ready() {
173
    return new Promise<void>((ff) => {
174
      if(this.opened) ff()
175
      else this.onReadyQueue.push(ff)
176
    })
177
  }
178
  public on(
179
    event_name: wsc.WSEvent,
180
    handler: (data: any) => any,
181
    predicate: (data: any) => boolean = T,
182
    raw = false
183
  ) {
184
    const _handler: wsc.EventHandler = (event) =>
185
      predicate(event) && handler(event)
186
    if(raw) add_event(this.ws as wsc.Socket, event_name, _handler)
187
    else this.handlers[event_name].push(_handler)
188
    return _handler
189
  }
190
  public off(
191
    event_name: wsc.WSEvent,
192
    handler: (data: any) => any,
193
    raw = false
194
  ) {
195
    if(raw) return rm_event(this.ws as wsc.Socket, event_name, handler)
196
    const handlers = this.handlers[event_name]
197
    const i = handlers.indexOf(handler)
198
    if(~i) handlers.splice(i, 1)
199
  }
200
201
  public async close(): wsc.AsyncErrCode {
202
    return new Promise((ff, rj) => {
203
      if(this.ws === null) {
204
        rj('WSP: closing a non-inited socket!')
205
      } else {
206
        this.onCloseQueue.push(() => {
207
          this.init_flush()
208
          ff(null)
209
        })
210
        this.ws.close()
211
        this.ws = null
212
        this.intentionally_closed = true
213
      }
214
    })
215
  }
216
217
  public open() {
218
    if(!this.opened) {
219
      this.intentionally_closed = false
220
      return this.connect()
221
    }
222
  }
223
224
  /**  .send(your_data) wraps request to server with {id: `hash`, data: `actually your data`},
225
    returns a Promise that will be rejected after a timeout or
226
    resolved if server returns the same signature: {id: `same_hash`, data: `response data`}.
227
  */
228
  public async send<RequestDataType = any, ResponseDataType = any>(
229
    message_data: RequestDataType,
230
    opts = <wsc.SendOptions>{}
231
  ): Promise<ResponseDataType> {
232
    this.log('send', message_data)
233
    const {config, queue} = this
234
    const message = {}
235
    const {pipes, server: {data_key}} = config
236
    const {top, _is_ping} = opts
237
238
    const message_id = zipnum.zip((Math.random()*(MAX_32-10))|0)
239
    if(typeof top === 'object') {
240
      if(top[data_key]) {
241
        throw new Error('Attempting to set data key/token via send() options!')
242
      }
243
      Object.assign(message, top)
244
    }
245
    for(const pipe of pipes) message_data = pipe(message_data)
246
    const [msg, err] = await Promise.all([
247
      config.encode(message_id, message_data, config),
248
      this.connect()
249
    ])
250
    if(err) throw new Error('ERR while opening connection #'+err)
251
    if(this.opened) {
252
      this.ws!.send(msg)
253
      if(!_is_ping) this.resetPing()
254
      this.resetIdle()
255
    }
256
257
    return new Promise((ff, rj) => {
258
      this.queue[message_id] = {
259
        msg, ff(x: any) {
260
          clearTimeout(this.timeout) // from this object!
261
          delete queue[message_id]
262
          ff(x)
263
        },
264
        data_type: config.data_type,
265
        sent_time: config.timer ? Date.now() : null,
266
        timeout: sett(config.timeout, () => {
267
          if(message_id in this.queue) {
268
            this.call('timeout', message_data)
269
            rj({'Websocket timeout expired': config.timeout, 'for the message': message_data})
270
            delete queue[message_id]
271
          }
272
        })
273
      }
274
    })
275
  }
276
277
  // TODO: Add .on handlers to config!
278
  constructor(user_config: wsc.UserConfig = {}) {
279
    this.config = processConfig(user_config)
280
    if(!this.config.lazy) this.connect()
281
  }
282
}
283
284
/* TODO: v3: @.deprecated. Use named import { WebSocketClient } instead. */
285
export default WebSocketClient